package cn.doitedu.rtdw.rt_report;

import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Author: deep as the sea
 * @Site: <a href="www.51doit.com">多易教育</a>
 * @QQ: 657270652
 * @Date: 2023/2/9
 * @Desc: 学大数据，到多易教育
 * 实时大屏报表： 各小时各品牌 销售额最高的 前10个商品
 **/
public class BrandTop10Product_TOPN {
    public static void main(String[] args) {
        // 编程环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/ckpt");
        env.setParallelism(1);

        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

        // 创建一个kafka的source表，用于读 订单+item表 关联结果
        tenv.executeSql(
                " CREATE TABLE tmp_kafka(             "
                        +"   order_id       BIGINT             "
                        +"  ,product_id    BIGINT              "
                        +"  ,product_name  STRING              "
                        +"  ,product_brand STRING              "
                        +"  ,product_quantity  INT             "
                        +"  ,product_price     DECIMAL         "
                        +"  ,order_time  timestamp(3)          "
                        +"  ,watermark for order_time as order_time"
                        + " ) WITH (                                           "
                        + "  'connector' = 'kafka',                            "
                        + "  'topic' = 'order-item-tmp',                       "
                        + "  'properties.bootstrap.servers' = 'doitedu:9092',  "
                        + "  'properties.group.id' = 'testGroup',              "
                        + "  'scan.startup.mode' = 'earliest-offset',          "
                        + "  'value.format'='json',                            "
                        + "  'value.json.fail-on-missing-field'='false',       "
                        + "  'value.fields-include' = 'EXCEPT_KEY')            ");

        // 创建逻辑映射表，用于输出数据到mysql的结果报表
        tenv.executeSql(
                " CREATE TABLE band_top_pd_sink (                   "
                        + "   window_start   timestamp(3),                    "
                        + "   window_end     timestamp(3),                    "
                        + "   brand_name     STRING,                          "
                        + "   product_name   STRING,                          "
                        + "   sale_amt       DECIMAL,                         "
                        + "   rn             BIGINT                           "
                        + " ) WITH (                                          "
                        + "    'connector' = 'jdbc',                          "
                        + "    'url' = 'jdbc:mysql://doitedu:3306/rtrpt',     "
                        + "    'table-name' = 'rtrpt_brand_topn_item',        "
                        + "    'username' = 'root',                           "
                        + "    'password' = 'root'                            "
                        + " )                                                 "
        );


        // 执行  统计逻辑（关联  + 分组topn）
        tenv.executeSql(
                " INSERT INTO band_top_pd_sink                                  "
                        +" SELECT                                                        "
                        +"    window_start                                               "
                        +"   ,window_end                                                 "
                        +"   ,product_brand as brand_name                                "
                        +"   ,product_name                                               "
                        +"   ,sale_amt                                                   "
                        +"   ,rn                                                         "
                        +" FROM (                                                        "
                        +" SELECT                                                        "
                        +"    window_start                                               "
                        +"   ,window_end                                                 "
                        +"   ,product_brand                                              "
                        +"   ,product_name                                               "
                        +"   ,sale_amt                                                   "
                        +"   ,row_number() over(                                         "
                        +"       partition by window_start,window_end,product_brand      "
                        +" 	  order by sale_amt desc) as rn                              "
                        +" FROM (                                                        "
                        +" SELECT                                                        "
                        +"   window_start                                                "
                        +"   ,window_end                                                 "
                        +"   ,product_brand                                              "
                        +"   ,product_name                                               "
                        +"   ,sum(product_price * product_quantity) as sale_amt          "
                        +" FROM TABLE(                                                   "
                        +"  TUMBLE(TABLE tmp_kafka, DESCRIPTOR(order_time),interval '1' hour)    "
                        +" )                                                             "
                        +" GROUP BY                                                      "
                        +" window_start,                                                 "
                        +" window_end,                                                   "
                        +" product_brand,                                                "
                        +" product_name ) o1                                             "
                        +" ) o2                                                          "
                        +" WHERE rn<=10                                                  "
        );


        // 输出结果到大屏系统所连接的mysql中的目标表

    }

}
